import json

from pyspark import SparkContext,SparkConf
import os
os.environ['PYSPARK_PYTHON']="D:/python/python.exe"
conf=SparkConf().setMaster("local[*]").setAppName("test_spark")
sc=SparkContext(conf=conf)

# TODO 需求1：城市销售额排名
# 1.1 读取文件得到RDD
file_rdd=sc.textFile("D:/桌面/js/资料/第15章资料/资料/orders.txt")
# 1.2 取出一个个JSON字符串
json_str_rdd=file_rdd.flatMap(lambda x:x.split("|"))
# 1.3 将一个个JSON字符串转换为字典
dict_rdd=json_str_rdd.map(lambda x:json.loads(x))
# 1.4 取出城市和销售额数据
#(城市，销售额)
city_with_money_rdd=dict_rdd.map(lambda x:(x['areaName'],int(x['money'])))
# 1.5 按城市分组按销售额聚合
city_result_rdd=city_with_money_rdd.reduceByKey(lambda a,b:a+b)
# 1.6 按销售额聚合结果进行排序
result1_rdd=city_result_rdd.sortBy(lambda x:x[1],ascending=False,numPartitions=1)
print("需求1的结果：",result1_rdd.collect())

# TODO 需求二：全部城市有哪些商品类别在售卖
# 2.1 取出全部的商品类别
category_rdd=dict_rdd.map(lambda x:x['category']).distinct()
print("需求2的结果：",category_rdd.collect())
# 2.2对全部商品类别进行去重
# TODO 需求3：北京市有哪些商品类别在售卖
#3.1 过滤北京市的数据
beijing_data_rdd=dict_rdd.filter(lambda x:x['areaName']=='北京')
# 3。2取出全部商品类别
result3_rdd=beijing_data_rdd.map(lambda x:x['category']).distinct()
print("需求3的结果：",result3_rdd.collect())
# 3.3进行商品类别去重